package kafka

import (
	"time"

	"gitee.com/chenhonghua/ginorigin/log"
	"github.com/Shopify/sarama"
)

var (
	kafkaConfig  KafkaConfig
	kafkaSetting *sarama.Config
	producer     sarama.SyncProducer
	consumer     sarama.Consumer
	client       sarama.Client
)

type KafkaConfig struct {
	Enable           bool     `mapstructure:"enable" json:"enable" yaml:"enable"`                                 // 是否开启
	Partition        int32    `mapstructure:"partition" json:"partition" yaml:"partition"`                        // 分区
	EncoderKey       string   `mapstructure:"encoder-key" json:"encoderKey" yaml:"encoder-key"`                   // 编码key
	BootstrapServers []string `mapstructure:"bootstrap-servers" json:"bootstrapServers" yaml:"bootstrap-servers"` // kafka节点服务器
}

func (c KafkaConfig) Load() {
	kafkaConfig = c
	if !kafkaConfig.Enable {
		return
	}
	log.Debugf("开启kafka模块:%v\n", kafkaConfig)
	kafkaSetting = sarama.NewConfig()
	kafkaSetting.Producer.RequiredAcks = sarama.WaitForAll
	kafkaSetting.Producer.Return.Successes = true
	kafkaSetting.Producer.Partitioner = sarama.NewRandomPartitioner
	kafkaSetting.Consumer.Offsets.AutoCommit.Enable = true
	kafkaSetting.Consumer.Offsets.AutoCommit.Interval = time.Duration(1) * time.Second
	kafkaSetting.Consumer.Offsets.Initial = sarama.OffsetNewest
	var err error
	client, err = sarama.NewClient(kafkaConfig.BootstrapServers, kafkaSetting)
	if err != nil {
		log.Fatal(err)
	}
	producer, err = sarama.NewSyncProducerFromClient(client)
	if err != nil {
		log.Fatal(err)
	}
	consumer, err = sarama.NewConsumerFromClient(client)
	if err != nil {
		log.Fatal(err)
	}
}
